//https://blog.csdn.net/qq_52462620/article/details/141405596  https://blog.csdn.net/qq_30614345/article/details/134631436
package cn.com.dragonpass.infra.settle.service.impl;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class MultiThreadTransactionManager {

    @Autowired
    PlatformTransactionManager dataSourceTransactionManager;



    /**
     * 用于判断子线程业务是否处理完成
     * 处理完成时threadCountDownLatch的值为0
     */
    private CountDownLatch threadCountDownLatch;

    /**
     * 用于等待子线程全部完成后,子线程统一进行提交和回滚
     * 进行提交和回滚时mainCountDownLatch的值为0
     */
    private final CountDownLatch mainCountDownLatch = new CountDownLatch(1);

    /**
     * 是否提交事务,默认是true,当子线程有异常发生时,设置为false,回滚事务
     */
    private  AtomicBoolean isSubmit = new AtomicBoolean(true);

    public boolean execute(List<Runnable> runnableList) {
        // 超时时间
        long timeout = 30;
        setThreadCountDownLatch(4);//4个核心线程
        ExecutorService executorService = ExecutorConfig.getThreadPool();
        isSubmit.set(true);
//        for (Runnable runnable : runnableList) {
//            executorService.execute(()->{
//                executeThread(runnable, threadCountDownLatch, mainCountDownLatch, isSubmit);
//            });
//        }
        runnableList.forEach(runnable -> executorService.execute(() -> executeThread(runnable, threadCountDownLatch, mainCountDownLatch, isSubmit)));
        // 等待子线程全部执行完毕
        try {
            // 若计数器变为零了,则返回 true
            boolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);
            if (!isFinish) {
                // 如果还有为执行完成的就回滚
                isSubmit.set(false);
                System.out.println("存在子线程在预期时间内未执行完毕,任务将全部回滚");
            }
        } catch (Exception exception) {
            System.out.println("主线程发生异常,异常为: " + exception.getMessage());
        } finally {
            // 计数器减1,代表该主线程执行完毕
            mainCountDownLatch.countDown();
        }
        // 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
        return isSubmit.get();
    }

    private void executeThread(Runnable runnable, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) {
        System.out.println("子线程: [" + Thread.currentThread().getName() + "]");
        // 判断别的子线程是否已经出现错误,错误别的线程已经出现错误,那么所有的都要回滚,这个子线程就没有必要执行了
        if (!isSubmit.get()) {
            System.out.println("整个事务中有子线程执行失败需要回滚, 子线程: [" + Thread.currentThread().getName() + "] 终止执行");
            // 计数器减1,代表该子线程执行完毕
            threadCountDownLatch.countDown();
            return;
        }
        // 开启事务
        DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
        try {
            // 执行业务逻辑
            runnable.run();
            System.out.println("子线程: [" + Thread.currentThread().getName() + "]执行........");
        } catch (Exception exception) {
            // 发生异常需要进行回滚,设置isSubmit为false
            isSubmit.set(false);
            System.out.println("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());
        } finally {
            // 计数器减1,代表该子线程执行完毕
            threadCountDownLatch.countDown();
        }
        try {
            // 等待主线程执行
            mainCountDownLatch.await();
            System.out.println("************************************");
        } catch (Exception exception) {
            System.out.println("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());
        }
        try {
            // 提交
            if (isSubmit.get()) {
                dataSourceTransactionManager.commit(transactionStatus);
                System.out.println("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");
            } else {
                dataSourceTransactionManager.rollback(transactionStatus);
                System.out.println("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");
            }
        } catch (Exception exception) {
            System.out.println("子线程: [" + Thread.currentThread().getName() + "]进行事务提交或回滚出现异常,异常为:" + exception.getMessage());
        }
    }

    private void setThreadCountDownLatch(int num) {
        this.threadCountDownLatch = new CountDownLatch(num);
    }



    private void executeThread2(ExecutorService executorService, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) throws InterruptedException {

        // 开启事务


            // 执行业务逻辑
            executorService.execute(()->{
                DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
                TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
                try {

                    dataSourceTransactionManager.commit(transactionStatus);
                    System.out.println("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");
                   System.out.println("子线程: [" + Thread.currentThread().getName() + "]执行........");
            } catch (Exception exception) {
                // 发生异常需要进行回滚,设置isSubmit为false
                isSubmit.set(false);
                System.out.println("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());
                    dataSourceTransactionManager.rollback(transactionStatus);
                    System.out.println("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");
            } finally {
                // 计数器减1,代表该子线程执行完毕
                threadCountDownLatch.countDown();
            }






            });

        boolean isFinish = threadCountDownLatch.await(30, TimeUnit.SECONDS);
        if (!isFinish) {
            // 如果还有为执行完成的就回滚
            isSubmit.set(false);
            System.out.println("存在子线程在预期时间内未执行完毕,任务将全部回滚");
        }












        try {
            // 等待主线程执行
            mainCountDownLatch.await();
            System.out.println("************************************");
        } catch (Exception exception) {
            System.out.println("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());
        }
        mainCountDownLatch.countDown();








    }




}
